package com.joysuccess.alarms.interceptor;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import redis.clients.jedis.Jedis;

import java.util.Map;

@Slf4j
public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String,String> {
    private Jedis jedis = new Jedis("127.0.0.1",6379);
    @Override
    public ConsumerRecords<String,String> onConsume(ConsumerRecords<String,String> consumerRecords) {
        long latency = 0L;
        for(ConsumerRecord consumerRecord : consumerRecords){
            latency += (System.currentTimeMillis()-consumerRecord.timestamp());
        }
        try {
            jedis.incrBy("totalLatency", latency);
            //以下这段代码我认为其实可以不用在这里计算，可以在另外一个地方做计算得出结论，避免降低吞吐量
            long totalLatency = Long.parseLong(jedis.get("totalLatency"));
            long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage"));
            jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs));
        }catch (Exception e){
           log.info("error: AvgLatencyConsumerInterceptor.onConsume() msg:"+e.getMessage());
        }

        return consumerRecords;
    }

    @Override
    public void close() {

    }

    @Override
    public void onCommit(Map map) {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}
